package client

import (
	"context"
	"crypto/x509"
	"errors"
	"fmt"
	"gitee.com/zhucheer/orange/cfg"
	"gitee.com/zhucheer/orange/logger"
	"gitee.com/zhucheer/pub-connect/app/define"
	pubproto "gitee.com/zhucheer/pub-connect/app/proto"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/connectivity"
	"google.golang.org/grpc/credentials"
	"io"
	"log"
	"time"
)

type VwPubClientTsl struct {
	ServerName  string
	CertContent string
}

type VwPubClientOpts struct {
	ServerAddr    string
	AppName       string
	CodeId        int
	Version       string
	Secret        string
	SessionId     string
	NsName        string
	PodName       string
	ContainerName string
	RecvHandler   func(d *pubproto.WorkDetail) (pubproto.WorkDetail, error)
	TSL           VwPubClientTsl
	StopSign      chan int
}

// ActivePushMessageChan 主动异步推流到服务端通道
var ActivePushMessageChan = make(chan string)

// NewGrpcClient 启动GRPC客户端
func NewGrpcClient(c context.Context, conn *grpc.ClientConn, clientOpts *VwPubClientOpts) (err error) {
	if clientOpts == nil || clientOpts.AppName == "" {
		log.Fatalf("client options not found")
		return
	}
	if clientOpts.StopSign == nil {
		clientOpts.StopSign = make(chan int, 1)
	}

	if conn != nil {
		log.Println("grpc status: ", conn.GetState())
		if conn.GetState() != connectivity.Shutdown {
			// 已经是连接状态
			return errors.New("is connected")
		}
		conn.Close()
	}
	var opts []grpc.DialOption
	if clientOpts.TSL.ServerName != "" {
		cp := x509.NewCertPool()
		if !cp.AppendCertsFromPEM([]byte(clientOpts.TSL.CertContent)) {
			return fmt.Errorf("credentials: failed to append certificates")
		}

		// grpc.tsl.hostname 为ca证书签名的域名
		creds := credentials.NewClientTLSFromCert(cp, clientOpts.TSL.ServerName)
		opts = append(opts, grpc.WithTransportCredentials(creds))
	} else {
		opts = append(opts, grpc.WithInsecure())
	}

	opts = append(opts, grpc.WithBlock())
	ctx, _ := context.WithTimeout(context.TODO(), 30*time.Second)
	conn, err = grpc.DialContext(ctx, clientOpts.ServerAddr, opts...)
	if err != nil {
		fmt.Println("=====srv addr=======", clientOpts.ServerAddr)
		log.Fatalf("fail to dial: %v", err)
		return
	}

	client := pubproto.NewPubConnectClient(conn)
	log.Printf("connect <===> grpc server at serverAddr %v", clientOpts.ServerAddr)

	// 重试2次
	for i := 0; i < 3; i++ {
		err = runDoWork(c, client, clientOpts)
		if err != nil && err != context.Canceled {
			log.Printf("connect error wait 2s retry.")
			time.Sleep(2 * time.Second)
			continue
		}
		break
	}

	conn.Close()

	// 发送连接关闭事件
	clientOpts.StopSign <- 1
	fmt.Println("run do Work done")
	return
}

// 启动通信开始相关注册和双向通信流
// 客户端核心连接方法
func runDoWork(ctx context.Context, client pubproto.PubConnectClient, opts *VwPubClientOpts) (err error) {
	appName := opts.AppName
	secret := opts.Secret
	appUuid := "APP:" + uuid.New().String()

	// 注册获取命名空间
	// 初始执行注册事件
	_, err = client.Register(ctx, &pubproto.RegReq{
		AppName: appName,
		PriKey:  secret,
		Uuid:    appUuid,
	})
	if err != nil {
		log.Printf("fail to run Do Register: %v", err)
		return
	}

	// 执行双向监听stream流
	stream, err := client.DoWork(ctx)
	if err != nil {
		log.Printf("fail to run Do work: %v", err)
		return
	}

	// 双向通道连接后发送一个初始心跳消息
	err = stream.Send(&pubproto.WorkDetail{
		PriKey:  secret,
		AppName: appName,
		AppUuid: appUuid,
		Params:  opts.Version,
		Name:    define.WorkAgentHeartbeatMsg,
	})
	if err != nil {
		log.Printf("error stream.Send err:%v", err)
		return
	}

	// 主动推送监听
	go ActivePushMsgDo(ctx, appUuid, stream)

	for {
		// 持续监听服务端数据流,收到服务端消息进行后续回复动作
		in, err := stream.Recv()
		if err == io.EOF {
			// read done.
			log.Printf("read done  EOF")
			break
		}
		if err != nil {
			log.Printf("Failed to receive err %v", err)
			break
		}

		fmt.Println("=========client recv====", in.Name, in.Uuid, in.AppName)

		if in.AppName == appName {
			resp, err := *in, errors.New("sign wrong")
			if in.PriKey != secret {
				in.Status = define.StatusFailed
				log.Printf("secret err req %s, actual %s ", in.PriKey, secret)
			} else {
				log.Printf("begin handle work name: %s, uid: %s status is %s, params: %s", in.Name, in.Uuid, in.Status, in.Params)

				if in.Name == "create" && in.Status == define.StatusFailed {
					fmt.Println(in.Results)
					time.Sleep(time.Second)
					break
				}

				// 执行指定任务
				resp, err = opts.RecvHandler(in)
				if err != nil {
					log.Printf("handlerDo err: %v", err)
				}
			}

			// 执行结果向服务端推流
			resp.ResponseTime = time.Now().Unix()
			// 向服务端发送数据
			err = stream.Send(&resp)
			if err != nil {
				log.Printf("handlerDo Send err: %v", err)
			}
		}
	}

	return
}

// ActivePushMsgDo 主动往服务端发送消息
func ActivePushMsgDo(ctx context.Context, appUuid string, stream pubproto.PubConnect_DoWorkClient) {
	appName := cfg.GetString("grpc.appName", "")
	secret := cfg.GetString("grpc.secret", "")

LOOP:
	for {
		select {
		case <-ctx.Done():
			break LOOP

		case msg := <-ActivePushMessageChan:
			err := stream.Send(&pubproto.WorkDetail{
				PriKey:       secret,
				AppName:      appName,
				AppUuid:      appUuid,
				Status:       "up_call_msg",
				Results:      msg,
				ResponseTime: time.Now().Unix(),
				Name:         define.WorkActivePushMsg,
			})
			if err != nil && err != io.EOF {
				logger.Errorw("ActivePushMsgDo=>stream.Send", "err", err)
			}
		}
	}
}
